package com.xueyuan.wata.daph.node.flink117.string.general.connector

import com.xueyuan.wata.daph.flink117.api.node.string.StringMultipleOutput
import com.xueyuan.wata.daph.flink117.utils.CatalogUtil
import com.xueyuan.wata.daph.node.flink117.string.general.{GeneralMultipleOutputConfig, GeneralNode}

class GeneralMultipleOutput extends StringMultipleOutput with GeneralNode {
  override def out(lineToDS: Map[String, String]): Unit = {
    val config = nodeConfig.asInstanceOf[GeneralMultipleOutputConfig]
    val databaseName = config.databaseName
    val sqls = config.sqls
    val order = config.order
    val ss = if (order) sqls.sortBy(_.order) else sqls

    // 设置SQL方言，使用catalog
    su(config)

    // 创建表
    ss.foreach { sql =>
      createTable(databaseName, sql.createSql, sql.resultSql)
    }

    CatalogUtil.logAndProduceCatalogDetails(tableEnv, config.catalogName)

    // 创建sqlSet，把sql添加到sqlSet
    val sqlSet = tableEnv.createStatementSet()
    ss.map(_.resultSql).foreach { sql =>
      sqlSet.addInsertSql(sql)
      logger.info(s"addInsertSql: $sql")
    }

    // 执行sqlSet，打印执行结果
    val res = sqlSet.execute()
    res.print()
  }

  override def getNodeConfigClass = classOf[GeneralMultipleOutputConfig]
}
